/**
 * Copyright [2019] [LiBo/Alex of copyright liboware@gmail.com ]
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package report.split;

import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.StopWatch;
import cn.hutool.core.thread.ThreadUtil;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.RateLimiter;
import com.lhy.report.bean.MigrationDataParam;
import com.lhy.report.bean.MigrationProcessResponse;
import com.lhy.report.redis.HashRedisPartitionQueueManager;
import com.lhy.report.tools.AbsolutelyAverageDateRangeSplitter;
import com.lhy.report.tools.DataRangeSplitter;
import com.lhy.report.tools.DateRangeSplitter;
import com.lhy.report.tools.PageableAvergeDataRangeSplitter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.math.NumberUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;

import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @project-name:wiz-sound-ai3
 * @package-name:com.wiz.soundai.task.migration
 * @author:LiBo/Alex
 * @create-date:2021-09-18 11:48
 * @copyright:libo-alex4java
 * @email:liboware@gmail.com
 * @description: 迁移数据业务服务实现类
 */
@Slf4j
public abstract class AbstractMigrationDataService implements MigrationDataService {


    @Autowired
    private HashRedisPartitionQueueManager redisStatisticsManager;

    /**
     * 执行业务逻辑线程池
     */
    protected static ExecutorService DEFAULT_THREAD_POOL;

    /**
     * 可以被覆盖 核心业务逻辑执行器，不符合里氏替换原则，但是可以扩展调整修改核心功能
     *
     * @param migrationCallRecordParam
     * @param completionService
     * @param dataList
     * @return
     */
    protected List<List> processDataFunction(MigrationDataParam migrationCallRecordParam,
                                             CompletionService completionService,List dataList ,AtomicInteger success,AtomicInteger failure){
        //TODO 后期研发智能化 动态化分片技术（动态规划算法！）
        List<List> dataSingleTaskData = splitDataPartitionNumber(dataList,migrationCallRecordParam);
        for (List dataTaskData : dataSingleTaskData) {
            completionService.submit(() -> {
                int localCallCount = 0;
                //创建限流工具，例如：每秒执行5000次操作处理机制 RocketMQ 峰值 为 10W+ 应该可以hold住
                RateLimiter rateLimiter = RateLimiter.create(migrationCallRecordParam.getProcessRateValue(),
                        500, TimeUnit.MILLISECONDS);
                for (Object entity : dataTaskData) {
                    try {
                        //降低吞吐量(暂时不降低吞吐量)因为只执行一次
                        rateLimiter.acquire(1);
                        if (!migrationCallRecordParam.isMockOperation()) {
                            processNextStepSlink(executeDataFunction(migrationCallRecordParam, dataList, entity,success,failure));
                        }
                        success.incrementAndGet();
                        // 每800条数据，限制一下处理速度机制 TODO 后期完善，是否存在等待机制 协调执行速度不一致问题
                        if (Math.floorMod(localCallCount++ , migrationCallRecordParam.getProcessRateValue()) == 0
                                && migrationCallRecordParam.isBalanceRate()) {
                            Thread.sleep(500);
                        }
                    } catch (Exception e) {
                        failure.incrementAndGet();
                        log.info("execute data faliure", e);
                    }
                }
                // 每800条数据，限制一下处理速度机制 TODO 后期完善，是否存在等待机制 协调执行速度不一致问题
                if (migrationCallRecordParam.isBalanceRate()) {
                    Thread.sleep(500);
                }
                return localCallCount;
            });
        }
        return dataSingleTaskData;
    }


    /**
     * 懒惰形式加载相关线程池数据对象！
     *
     * @return
     */
    private static synchronized ExecutorService lazyLoadTheSystemThreadPool() {
        if (Objects.isNull(DEFAULT_THREAD_POOL)) {
            DEFAULT_THREAD_POOL = ThreadUtil.newExecutor(10, 20, 200);
        }
        return DEFAULT_THREAD_POOL;
    }


    /**
     * 可以被覆盖 核心业务逻辑执行器，不符合里氏替换原则，但是可以扩展调整修改核心功能
     * migrationDataForTargetDateRange 通过指定的事件范围机制
     *
     * @param migrationCallRecordParam
     * @param success
     * @param failure
     */
    private int migrationDataForTargetDateRange(MigrationDataParam migrationCallRecordParam, AtomicInteger success, AtomicInteger failure) {
        Objects.requireNonNull(migrationCallRecordParam.getComputeStartTime(), "date start time is must be not null!");
        Objects.requireNonNull(migrationCallRecordParam.getComputeEndTime(), "date end time is must be not null!");
        Map<DateRangeSplitter.DateRangeSplit, Integer> dateRangeSplitIntegerMap = new ConcurrentHashMap<>();
        log.info("all in migration date range ：{}  {} ", migrationCallRecordParam.getComputeStartTime(), migrationCallRecordParam.getComputeEndTime());
        // 循环执行相关的数据遍历拆分机制
        int callCount = 0;
        // 获取总体条件的数据总数
        MigrationDataParam migrationCallRecordParam1 = new MigrationDataParam(migrationCallRecordParam.getDataType(), Date.from(migrationCallRecordParam.getComputeStartTime().atZone(ZoneId.systemDefault()).toInstant()), Date.from(migrationCallRecordParam.getComputeEndTime().atZone(ZoneId.systemDefault()).toInstant()));
        // 构建总体参数信息
        Long allCount = migrationCallRecordParam.getAllCount();
        if (Objects.isNull(allCount)) {
            allCount = Optional.ofNullable(fetchDataAllCount(migrationCallRecordParam1)).orElse(NumberUtils.LONG_ZERO);
            migrationCallRecordParam.setAllCount(allCount);
        }
        // 计算子分片结果数据信息数据，绝对算法进行分片机制
        AbsolutelyAverageDateRangeSplitter absolutelyAverageDateRangeSplitter = new AbsolutelyAverageDateRangeSplitter(migrationCallRecordParam.getDateSplitIntervalDays(), ChronoUnit.DAYS);
        List<DateRangeSplitter.DateRangeSplit> dateRangeSplits = absolutelyAverageDateRangeSplitter.splitDateByUnit(migrationCallRecordParam.getComputeStartTime(), migrationCallRecordParam.getComputeEndTime());
        log.info("compute the data split is num:{}",dateRangeSplits.size());
        for (DateRangeSplitter.DateRangeSplit dateRangeSplit : dateRangeSplits) {
            // 目前暂时只支持dataType信息控制
            MigrationDataParam migrationCallRecordParamSub = new MigrationDataParam(migrationCallRecordParam.getDataType());
            migrationCallRecordParamSub.setProcessRateValue(migrationCallRecordParam.getProcessRateValue());
            migrationCallRecordParamSub.setDateRangeSplit(dateRangeSplit);
            migrationCallRecordParamSub.setDataRangeType(migrationCallRecordParam.getDataRangeType());
            migrationCallRecordParamSub.setMockOperation(migrationCallRecordParam.isMockOperation());
            log.info("【sub query parameter range 】：{} - {}", migrationCallRecordParamSub);
            List dataList = fetchDataFunction(migrationCallRecordParamSub);
            log.info("【sub query parameter data num 】：{} ", dataList.size());
            if (CollectionUtil.isEmpty(dataList)) {
                continue;
            }
            //固定N个线程
            callCount += dataList.size();
            //核心逻辑执行功能
            int subResult = executeProcessResultCount(migrationCallRecordParamSub, dataList, success, failure);
            // 保存每次执行的结果信息
            dateRangeSplitIntegerMap.put(dateRangeSplit, subResult);
            log.info("--------------------------------【migration sub process】: dateRangeSplit data model {}" + " -> plan execute number:{} -> local do num :{}   ->  current finished num {} -> remain do num {}------------------------" + "-------- ", dateRangeSplit, allCount, subResult, callCount, allCount - callCount);
        }
        return dateRangeSplitIntegerMap.values().stream().mapToInt(param -> param).sum();
    }

    /**
     * 可以被覆盖 核心业务逻辑执行器，不符合里氏替换原则，但是可以扩展调整修改核心功能
     * 迁移通过业务IDs集合进行迁移数据信息控制
     *
     * @param migrationCallRecordParam
     */
    private int migrationDataForTargetIds(MigrationDataParam migrationCallRecordParam, AtomicInteger success, AtomicInteger failure) {
        // 循环执行相关的数据遍历拆分机制
        Preconditions.checkArgument(CollectionUtil.isNotEmpty(migrationCallRecordParam.getTargetBizCodes()), "date target codes is must be not null!");
        int callCount = 0;
        // 获取总体条件的数据总数
        Long allCount = migrationCallRecordParam.getAllCount();
        if (Objects.isNull(allCount)) {
            allCount = Optional.ofNullable(fetchDataAllCount(migrationCallRecordParam)).orElse(NumberUtils.LONG_ZERO);
            migrationCallRecordParam.setAllCount(allCount);
        }
        // 根据ID进行迁移暂时不考虑总数以及进行拆分机制，未来伴随着数量剧增， TODO 可以考虑进行操作迁移机制控制分片以及总数进度计算控制
        List dataList = fetchDataFunction(migrationCallRecordParam);
        if (CollectionUtil.isEmpty(dataList)) {
            return 0;
        }
        callCount = dataList.size();
        int subResult = executeProcessResultCount(migrationCallRecordParam, dataList, success, failure);
        // 保存每次执行的结果信息
        log.info("--------------------------------【migration sub process】:" + " -> plan execute number:{} -> local do num :{}   ->  current finished num {} -> remain do num {}------------------------" + "-------- ", allCount, subResult, callCount, allCount - callCount);
        return subResult;
    }


    /**
     * 数据分片范围计算迁移操作
     *
     * @param migrationCallRecordParam
     * @param success
     * @param failure
     * @return
     */
    private int migrationDataForDataRange(MigrationDataParam migrationCallRecordParam, AtomicInteger success, AtomicInteger failure) {
        // 循环执行相关的数据遍历拆分机制
        Preconditions.checkNotNull(migrationCallRecordParam.getAllCount(), "date all count numbers is must be not null!");
        Preconditions.checkArgument(migrationCallRecordParam.getProcessDataUnit() > 0, "processRateValue is not null!");
        int callCount = 0;
        // 获取总体条件的数据总数
        Long allCount = migrationCallRecordParam.getAllCount();
        // 根据ID进行迁移暂时不考虑总数以及进行拆分机制，未来伴随着数量剧增， TODO 可以考虑进行操作迁移机制控制分片以及总数进度计算控制
        DataRangeSplitter dataRangeSplitter = new PageableAvergeDataRangeSplitter();
        List<DataRangeSplitter.DataNumberRange> dataRangeCollection = dataRangeSplitter.splitDataByUnit(migrationCallRecordParam.getAllCount(), migrationCallRecordParam.getProcessDataUnit().intValue());
        // 操作处理机制控制
        Map<DataRangeSplitter.DataNumberRange, Integer> dateRangeSplitIntegerMap = new ConcurrentHashMap<>();
        if (CollectionUtil.isEmpty(dataRangeCollection)) {
            return 0;
        }
        // 数据信息区分！
        for (DataRangeSplitter.DataNumberRange dataNumberRange : dataRangeCollection) {
            long index = dataNumberRange.getStartIndex();
            long offset = (dataNumberRange.getEndIndex() - dataNumberRange.getStartIndex()) + 1;
            log.info("query start:{} offset：{}", index, offset);
            // 获取数据方法机制控制
            MigrationDataParam sub = migrationCallRecordParam.clone();
            sub.setDataCursor(index);
            sub.setProcessDataUnit(offset);
            sub.setDataRangeType(migrationCallRecordParam.getDataRangeType());
            List dataList = fetchDataFunction(sub);
            // 操作处理机制控制
            if (CollectionUtil.isEmpty(dataList)) {
                continue;
            }
            callCount = dataList.size();
            sub.setProcessRateValue(migrationCallRecordParam.getProcessDataUnit().intValue());
            int subResult = executeProcessResultCount(sub, dataList, success, failure);
            // 保存每次执行的结果信息
            dateRangeSplitIntegerMap.put(dataNumberRange, subResult);
            log.info("--------------------------------【migration sub process】:" + " -> plan execute number:{} -> local do num :{}   ->  current finished num {} -> remain do num {}------------------------" + "-------- ", allCount, subResult, callCount, allCount - callCount);
            return subResult;
        }
        return dateRangeSplitIntegerMap.values().stream().mapToInt(param -> param).sum();
    }


    /**
     * 获取子处理结果数据信息量值
     *
     * @param migrationCallRecordParamSub
     * @param dataList
     * @param success
     * @param failure
     * @return
     */
    @SuppressWarnings("all")
    private Integer executeProcessResultCount(MigrationDataParam migrationCallRecordParamSub, List dataList, AtomicInteger success, AtomicInteger failure) {
        CompletionService completionService = new ExecutorCompletionService(Optional.ofNullable(getExecutorService()).orElseGet(AbstractMigrationDataService::lazyLoadTheSystemThreadPool));
        List dataSingleTaskData = processDataFunction(migrationCallRecordParamSub, completionService, dataList, success, failure);
        //核心逻辑执行功能
        return dataSingleTaskData.stream().map(param -> {
            try {
                return completionService.take().get();
            } catch (InterruptedException e) {
                log.error("interrupt exception execute problem", e);
            } catch (ExecutionException e) {
                log.error("ExecutionException exception execute problem", e);
            }
            catch (Exception e) {
                log.error("ExecutionException exception execute problem", e);
            }
            return 0;
        }).mapToInt(param -> (int) param).sum();
    }


    /**
     * 迁移数据业务服务数据实现类（任务->呼叫记录层面）
     *
     * @param migrationCallRecordParam
     */
    @Async
    @Override
    public void migrationDataProcess(MigrationDataParam migrationCallRecordParam) {
        Objects.requireNonNull(migrationCallRecordParam, "no defined the migrationCallRecordParam!");
        Objects.requireNonNull(migrationCallRecordParam.getDataRangeType(), "no defined the data range type value");
        AtomicInteger success = new AtomicInteger(0);
        AtomicInteger failure = new AtomicInteger(0);
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        int actualCount = 0;
        try {
            switch (migrationCallRecordParam.getDataRangeType()) {
                case MIGRATION_DATE_RANGE:
                    actualCount = migrationDataForTargetDateRange(migrationCallRecordParam, success, failure);
                    break;
                case MIGRATION_TARGET_IDS:
                    actualCount = migrationDataForTargetIds(migrationCallRecordParam, success, failure);
                    break;
                case MIGRATION_DATA_RANGE:
                    actualCount = migrationDataForDataRange(migrationCallRecordParam, success, failure);
                    break;
                default:
                    throw new UnsupportedOperationException("not support the type");
            }
            stopWatch.stop();
        } catch (UnsupportedOperationException e) {
            log.error("UnsupportedOperationException", e);
        } catch (IllegalStateException e) {
            log.error("IllegalStateException", e);
        } finally {
            finalizeProcess(migrationCallRecordParam);
            log.info("--------------------------------【migration - final 】: plan execute number:{} -" + "acutual number ：{} " + "- faliure ：{} - cost time:{} 秒 --------------------------------", migrationCallRecordParam.getAllCount(), actualCount, failure.get(), stopWatch.getTotalTimeSeconds());
        }
    }

    /**
     * @author libo
     * 此部分可以允许开发者覆盖定制化实现，未来恶意对接kafka或者其他方式的消费机制！
     * 默认实现机制为redis取做控制
     * 处理下一个阶段的执行操作机制
     * @param migrationProcessResponse
     */
    protected void processNextStepSlink(MigrationProcessResponse migrationProcessResponse){
        //将同一个线索下的数据作为同一个队列的数据信息，串行处理机制。w
        try {
            if(Objects.nonNull(migrationProcessResponse)) {
                Long currentQueueSize = redisStatisticsManager.
                        addAutoPartitionBlockingElement(migrationProcessResponse.getOutputStreamSlinkName(),
                        migrationProcessResponse.getEntity(),4);
                log.info("current queue size:{}", currentQueueSize);
            }
        } catch (Exception e) {
            log.error("do execute next step process failure!",e);
        }
    }

    /**
     * 进行数据分区分片操作处理!
     * @param list
     * @return
     */
    protected List<List> splitDataPartitionNumber(List list,MigrationDataParam migrationDataParam){
        return Lists.partition(list,migrationDataParam.getProcessRateValue());
    }

    /**
     * 获取相关数据功能
     *
     * @param migrationCallRecordParamSub
     * @return
     */
    protected abstract List fetchDataFunction(MigrationDataParam migrationCallRecordParamSub);

    /**
     * 获取相关所有数据总量（作为计算综合的依据数据信息）,非必须实现机制
     *
     * @param migrationCallRecordParamSub
     * @return
     */
    protected abstract Long fetchDataAllCount(MigrationDataParam migrationCallRecordParamSub);

    /**
     * 执行数据功能
     * @param migrationCallRecordParam
     * @param dataList                 总体数据结构集合
     * @param entity                   当前处理的数据集合信息
     */
    protected abstract MigrationProcessResponse executeDataFunction(MigrationDataParam migrationCallRecordParam, List dataList, Object entity, AtomicInteger success, AtomicInteger failure);




    private static int ceilDiv(long number,long splitNumber){
        // 处理操作
        if(number<=splitNumber){
            return 1;
        }
        int cycleNumberFact = (int) (number/splitNumber);
        int cycleNumberFactExt = (int) (number%splitNumber);
        if(cycleNumberFactExt>0){
            return cycleNumberFact+1;
        }
        return cycleNumberFact;
    }


    public static void main(String[] args) {

        ExecutorService executorService =  ThreadUtil.newExecutor(1,2,2);

        ExecutorService executorService1= new ThreadPoolExecutor(1,1,10,
                TimeUnit.SECONDS,new ArrayBlockingQueue<>(2));


        CompletionService completionService =  new ExecutorCompletionService(executorService);


        completionService.submit(()->{
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("1213");
            return 1;
        });


        completionService.submit(()->{
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("1213");
            return 1;
        });

        executorService.submit(()->{
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("1213");
            return 1;

        });

        executorService.submit(()->{
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("1213");
            return 1;

        });

        executorService.submit(()->{
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("1213");
            return 1;
        });


    }




}
